1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkNotNull;
20
21 import com.google.common.base.Preconditions;
22 import com.google.common.collect.Queues;
23
24 import java.util.Queue;
25 import java.util.concurrent.Executor;
26 import java.util.logging.Level;
27 import java.util.logging.Logger;
28
29 import javax.annotation.concurrent.GuardedBy;
30
31
32
33
34
35
36
37
38 final class ListenerCallQueue<L> implements Runnable {
39
40 private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());
41
42 abstract static class Callback<L> {
43 private final String methodCall;
44
45 Callback(String methodCall) {
46 this.methodCall = methodCall;
47 }
48
49 abstract void call(L listener);
50
51
52 void enqueueOn(Iterable<ListenerCallQueue<L>> queues) {
53 for (ListenerCallQueue<L> queue : queues) {
54 queue.add(this);
55 }
56 }
57 }
58
59 private final L listener;
60 private final Executor executor;
61
62 @GuardedBy("this") private final Queue<Callback<L>> waitQueue = Queues.newArrayDeque();
63 @GuardedBy("this") private boolean isThreadScheduled;
64
65 ListenerCallQueue(L listener, Executor executor) {
66 this.listener = checkNotNull(listener);
67 this.executor = checkNotNull(executor);
68 }
69
70
71 synchronized void add(Callback<L> callback) {
72 waitQueue.add(callback);
73 }
74
75
76 void execute() {
77 boolean scheduleTaskRunner = false;
78 synchronized (this) {
79 if (!isThreadScheduled) {
80 isThreadScheduled = true;
81 scheduleTaskRunner = true;
82 }
83 }
84 if (scheduleTaskRunner) {
85 try {
86 executor.execute(this);
87 } catch (RuntimeException e) {
88
89 synchronized (this) {
90 isThreadScheduled = false;
91 }
92
93 logger.log(Level.SEVERE,
94 "Exception while running callbacks for " + listener + " on " + executor,
95 e);
96 throw e;
97 }
98 }
99 }
100
101 @Override public void run() {
102 boolean stillRunning = true;
103 try {
104 while (true) {
105 Callback<L> nextToRun;
106 synchronized (ListenerCallQueue.this) {
107 Preconditions.checkState(isThreadScheduled);
108 nextToRun = waitQueue.poll();
109 if (nextToRun == null) {
110 isThreadScheduled = false;
111 stillRunning = false;
112 break;
113 }
114 }
115
116
117 try {
118 nextToRun.call(listener);
119 } catch (RuntimeException e) {
120
121 logger.log(Level.SEVERE,
122 "Exception while executing callback: " + listener + "." + nextToRun.methodCall,
123 e);
124 }
125 }
126 } finally {
127 if (stillRunning) {
128
129
130
131 synchronized (ListenerCallQueue.this) {
132 isThreadScheduled = false;
133 }
134 }
135 }
136 }
137 }